-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Atomic move #496
Atomic move #496
Conversation
Looks like there's a lot of dead code that can be removed now: lots of others... feels good to clean it up! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really excited for this! I played around with it locally and it works well. My only concern is that with a very high number of workers (eg we have 12 workers at 50 concurrency each), each worker will spend a lot more CPU now that it has to listen to global messages from other workers processing jobs. So for every single job processed, all workers need to call getNextJob
at least once. But that's probably an OK tradeoff for now.
lib/queue.js
Outdated
@@ -426,7 +429,10 @@ interface JobOptions | |||
@param opts: JobOptions Options for this job. | |||
*/ | |||
Queue.prototype.add = function(name, data, opts){ | |||
return Job.create(this, name, data, opts); | |||
var _this = this; | |||
return this.isReady().then(function(){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch :)
lib/scripts/moveToActive.lua
Outdated
ARGV[3] lock duration in milliseconds | ||
]] | ||
|
||
local jobId = redis.call("LINDEX", KEYS[1], -1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! O(1) call to fetch the job.
lib/scripts/moveToFinished.lua
Outdated
|
||
Output: | ||
0 OK | ||
1 Missing key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be -1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
if (lock){ | ||
_this.lock = lock; | ||
} | ||
Job.prototype.takeLock = function(){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it can be removed - right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is only used by a unit test indirectly, will try to get rid of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, would be nice if the unit test implemented it itself. Just to keep the core queue implementation pure and simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think takeLock
should be tested, since this is meant to be a "private" function, as long as a public function that uses "locks" is tested, it is enough. A code coverage output for the unit tests should tell us if takeLock
is sufficiently tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it is not tested, but used in some unit test where we need to have a lock: https://github.com/OptimalBits/bull/blob/v3.0.0/test/test_job.js#L100
lib/scripts/moveToActive.lua
Outdated
end | ||
|
||
--[[ | ||
Release lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
old stuff. removed now.
this.processJob = this.processJob.bind(this); | ||
this.getJobFromId = Job.fromId.bind(null, this); | ||
}; | ||
|
||
util.inherits(Queue, Disturbed); | ||
|
||
Queue.ErrorMessages = errors.Messages; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note this in the changelog?
lib/queue.js
Outdated
@@ -284,11 +293,11 @@ Queue.prototype.isReady = function(){ | |||
} | |||
|
|||
Queue.prototype.getJobMoveCount = function(){ | |||
return this.bclient.commandQueue.length; | |||
return this.client.commandQueue.length; | |||
}; | |||
|
|||
Queue.prototype.whenCurrentMoveFinished = function(){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was this for originally? Seems like it can be removed to reduce complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed it.
lib/queue.js
Outdated
@@ -284,11 +293,11 @@ Queue.prototype.isReady = function(){ | |||
} | |||
|
|||
Queue.prototype.getJobMoveCount = function(){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was this for originally? Seems like it can be removed to reduce complexity.
* We call these jobs 'stalled'. This is the most common case. We resolve these by moving them | ||
* back to wait to be re-processed. To prevent jobs from cycling endlessly between active and wait, | ||
* (e.g. if the job handler keeps crashing), we limit the number stalled job recoveries to MAX_STALLED_JOB_COUNT. | ||
|
||
* Case B) The job was just moved to 'active' from 'wait' and the worker that moved it hasn't gotten |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to remove This is the most common case
above since it's now the only case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this has been removed.
lib/scripts.js
Outdated
// | ||
// TODO: DEPRECATE. | ||
// | ||
move: function(job, src, target){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is used anymore - remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only used in a unit test. will remove it.
I pushed some updated addressing the code review. Regarding performance, I do not think it will affect in any measurable manner. If you have a busy queue, the publishing of 'added' event will not be used since the worker will just pick new jobs directly after processing the previous one. If not busy, the extra overhead of the publish will be negligible. Besides, in the previous blocking version we also published similar events, so the overhead was already there, and the blocking call was timed out after 2.5 seconds, effectively making it a polling queue. Overall the queue is probably faster now, I will perform some benchmarks soon. The last thing remaining is a guard timmer and logic for handling the queue properly after a reconnection, we need better unit tests for that. |
lib/queue.js
Outdated
}, function(err){ | ||
_this.emit('error', err, 'failed to re-obtain lock before moving to failed, bailing'); | ||
}); | ||
// job.takeLock(true /* renew */, false /* ensureActive */).then( function(/*lock*/) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove commented out line
Yes- makes sense. Nice work! Excited for this. |
with these latest commits I feel ready for merging in 3.0.0 branch and I would like to release the first alpha version for the 3.x series. |
Looks good to me to merge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great effort here @manast ! I know you already merged but hopefully you can take a look at the comments?
Thanks again !
end | ||
end | ||
|
||
redis.call("PUBLISH", KEYS[4], jobId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the top documentation correct? It says that this will emit a waiting
event if not added but it looks like it never emits a waiting
event, it emits an added
event regardless if the queue is paused or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. I will fix it.
back to wait to be re-processed. To prevent jobs from cycling endlessly between active and wait, | ||
(e.g. if the job handler keeps crashing), we limit the number stalled job recoveries to MAX_STALLED_JOB_COUNT. | ||
|
||
DEPRECATED CASE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it mean "Deprecated case"? Not clear if this is still an ongoing problem with this move? If still is an issue, how do we know what exactly happened? Worker dropping jobs because it died vs Worker failing to start processing the job due this race condition issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove the comment. With single instance redis, this case could not happen anymore since the lock is obtained atomically with the move. In the future, if we want to support redis with replication, we need to re-introduce redlock, but we will be able to take the lock before moving the job, so in practice the case should not happen either.
@@ -22,20 +26,15 @@ var Job = function(queue, name, data, opts){ | |||
name = '__default__'; | |||
} | |||
|
|||
opts = opts || {}; | |||
this.opts = _.extend({}, opts); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor thing, no need to address:
You can use _.defaults
here to simplify some code here:
this.opts = _.defaults({
attempts: 1,
delay: 0,
timestamp: Date.now()
}, opts);
this.delay = this.opts.delay;
this.timestamp = this.opts.timestamp;
Probably not needed to reassign those to delay
or timestamp
since we're setting this.opts
anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will update using _.defaults since it is also semantically better. I feel options should always be kept in the opts field only, so delay should be in opts instead of a property of job, however, delay can also be defined when retrying a job, and in that case it would not be an option but a property used by the queue mechanics. Timestamp is also a bit special since normally it should be an internal property, but sometimes you want to override it, just as job.id.
attemptsMade: this.attemptsMade, | ||
failedReason: this.failedReason, | ||
stacktrace: this.stacktrace || null, | ||
returnvalue: this.returnvalue || null | ||
}; | ||
}; | ||
|
||
Job.prototype.toData = function(){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor:
Can probably be simplified with:
var whitelist = ['data', 'opts', 'stacktrace', 'returnvalue'];
JSON.stringify(_.pick(this.toJSON(), whitelist));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, a nice simplification.
if (lock){ | ||
_this.lock = lock; | ||
} | ||
Job.prototype.takeLock = function(){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think takeLock
should be tested, since this is meant to be a "private" function, as long as a public function that uses "locks" is tested, it is enough. A code coverage output for the unit tests should tell us if takeLock
is sufficiently tested.
|
||
Output: | ||
0 OK | ||
-1 Missing key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the lua script here, -1
means both, that the job is locked and that the key is missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right. I added a -2 error code when a lock is missing. I noticed however that these error codes are not being used, I need to fix it as well, it may be hidding errors right now.
|
||
if redis.call("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists | ||
if ARGV[5] ~= "0" then | ||
local lockKey = KEYS[3] .. ':lock' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've noticed that other scripts, such as moveToDelayed
, pause
, takeLock
doesn't check for lock existence, but other scripts such as this does. It should be consistent how we do lock checking.
I know that if these operations are executed it is implied that a lock exists, but even then, we've run into issues in the past due workers stepping in each other toes (hopefully this fixing some of these problems!) so I'd vote to always check for locks and have Bull complain loudly when it happens, that way these race condition issues can be better understood and controlled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add issue for such an improvement.
|
||
- job.jobId changed to job.id. | ||
- refactored error messages into separate error module. | ||
- completed and failed job states are now represented in ZSETs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add the reasoning behind this move?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- job.jobId -> job.id to reduce redundancy.
- error messages refactored so that we can unit test without needing to copy/paste the messages.
- ZSETs allows us to efficiently get ranges of completed and failed jobs, very useful for building UIs and scripts.
FWIW with respect to event listeners approach - it is actually not as CPU intensive as a polling alternative, if anything, it can be a source of potential memory leaks when having an unbound number of event listeners, nodejs avoids this by having a default max limit of event listeners of 10 to prevent a potential memory leak. The |
@chuym thanks for the review I will commit the fixes directly to the 3.0.0 branch. |
Implements #480 and #190